package com.powernode.demo;

import com.google.common.base.Charsets;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.powernode.constant.MqConstant;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.junit.Test;

import java.util.List;
import java.util.UUID;

/**
 * 重复消费测试
 * @author jiaxXM
 * @date 2024/7/9
 */
public class RepeatTest {

    @Test
    public void testRepeatProducer() throws Exception {
        // 创建默认的生产者
        DefaultMQProducer producer = new DefaultMQProducer("repeat-test-group");
        // 设置nameServer地址
        producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        // 启动实例
        producer.start();
        // 我们可以使用自定义key当做唯一标识
        String keyId = UUID.randomUUID().toString();
        System.out.println(keyId);
        Message msg = new Message("TopicRepeat", "tagA", keyId, "我是一个测试消息".getBytes());
        SendResult send = producer.send(msg);
        System.out.println(send);
        // 关闭实例
        producer.shutdown();
    }

    /**
     * 在boot项目中可以使用@Bean在整个容器中放置一个单利对象
     */
//    public static BitMapBloomFilter bloomFilter = new BitMapBloomFilter(100);

    BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 100, 0.01);


    @Test
    public void testRepeatConsumer() throws Exception {
        // 创建默认消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 设置nameServer地址
        consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        // 订阅一个主题来消费   表达式，默认是*
        consumer.subscribe("TopicRepeat", "*");
        // 注册一个消费监听 MessageListenerConcurrently是并发消费
        // 默认是20个线程一起消费，可以参看 consumer.setConsumeThreadMax()
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                // 拿到消息的key
                MessageExt messageExt = msgs.get(0);
                String keys = messageExt.getKeys();
                // 判断是否存在布隆过滤器中
                if (bloomFilter.mightContain(keys)) {
                    // 直接返回了 不往下处理业务
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                // 这个处理业务，然后放入过滤器中
                // do sth...
                bloomFilter.put(keys);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.in.read();
    }
}
